package com.uyuni.rpc.registry;

import com.alibaba.fastjson.JSON;
import com.uyuni.rpc.common.exception.remoting.RemotingSendRequestException;
import com.uyuni.rpc.common.exception.remoting.RemotingTimeoutException;
import com.uyuni.rpc.common.loadbalance.LoadBalanceStrategy;
import com.uyuni.rpc.common.protocol.UyuniProtocol;
import com.uyuni.rpc.common.rpc.RegisterMeta;
import com.uyuni.rpc.common.rpc.RegisterMeta.Address;
import com.uyuni.rpc.common.rpc.ServiceReviewState;
import com.uyuni.rpc.common.transport.body.*;
import com.uyuni.rpc.common.utils.PersistUtils;
import com.uyuni.rpc.registry.model.RegistryPersistRecord;
import com.uyuni.rpc.registry.model.RegistryPersistRecord.PersistProviderInfo;
import com.uyuni.rpc.transport.model.RemotingTransporter;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import io.netty.util.internal.ConcurrentSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static com.uyuni.rpc.common.serialization.SerializerHolder.serializerImpl;

/**
 * @author BazingaLyn
 * @description 注册服务中心端的provider侧的管理
 * @time 2016年8月15日
 * @modifytime
 */
public class RegistryProviderManager implements RegistryProviderServer {

    private static final Logger logger = LoggerFactory.getLogger(RegistryProviderManager.class);

    private static final AttributeKey<ConcurrentSet<String>> S_SUBSCRIBE_KEY = AttributeKey.valueOf("server.subscribed");
    private static final AttributeKey<ConcurrentSet<RegisterMeta>> S_PUBLISH_KEY = AttributeKey.valueOf("server.published");

    private DefaultRegistryServer defaultRegistryServer;

    // 全局注册信息
    private final ConcurrentMap<String/*serverName*/, ConcurrentMap<RegisterMeta.Address, RegisterMeta>> globalRegisterInfoMap = new ConcurrentHashMap<>();
    // 指定节点都注册了哪些服务
    private final ConcurrentMap<Address, ConcurrentSet<String/*serverName*/>> globalServiceMetaMap = new ConcurrentHashMap<>();
    // 某个服务 订阅它的消费者的channel集合
    private final ConcurrentMap<String, ConcurrentSet<Channel>> globalConsumerMetaMap = new ConcurrentHashMap<>();
    // 提供者某个地址对应的channel
    private final ConcurrentMap<Address, Channel> globalProviderChannelMetaMap = new ConcurrentHashMap<>();
    // 每个服务的持久化历史记录
    private final ConcurrentMap<String, RegistryPersistRecord> historyRecords = new ConcurrentHashMap<>();
    // 每个服务对应的负载策略
    private final ConcurrentMap<String, LoadBalanceStrategy> globalServiceLoadBalance = new ConcurrentHashMap<>();

    public RegistryProviderManager(DefaultRegistryServer defaultRegistryServer) {
        this.defaultRegistryServer = defaultRegistryServer;
    }

    public RemotingTransporter handleManager(RemotingTransporter request, Channel channel) throws RemotingSendRequestException, RemotingTimeoutException,
            InterruptedException {
        return null;
    }

    /**
     * 处理provider服务注册
     *
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    @Override
    public RemotingTransporter handlerRegister(RemotingTransporter remotingTransporter, Channel channel) throws RemotingSendRequestException,
            RemotingTimeoutException, InterruptedException {
        // 准备好ack信息返回个provider，悲观主义，默认返回失败ack，要求provider重新发送请求
        AckCustomBody ackCustomBody = new AckCustomBody(remotingTransporter.getOpaque(), false);
        RemotingTransporter responseTransporter = RemotingTransporter.createResponseTransporter(UyuniProtocol.ACK, ackCustomBody,
                remotingTransporter.getOpaque());

        // 接收到主体信息
        PublishServiceCustomBody publishServiceCustomBody = serializerImpl().readObject(remotingTransporter.bytes(), PublishServiceCustomBody.class);
        // 注册信息
        RegisterMeta meta = RegisterMeta.createRegiserMeta(publishServiceCustomBody, channel);
        if (logger.isDebugEnabled()) {
            logger.debug("Publish [{}] on channel[{}].", meta, channel);
        }
        // channel上打上该服务的标记 方便当channel inactive的时候，直接从channel上拿到标记的属性，通知
        attachPublishEventOnChannel(meta, channel);

        // 一个服务的最小单元，也是确定一个服务的最小单位
        final String serviceName = meta.getServiceName();
        // 找出提供此服务的全部地址和该服务在该地址下的审核情况
        ConcurrentMap<Address, RegisterMeta> maps = this.getRegisterMeta(serviceName);

        /**
         * 思路：
         * 有一个全局的当前存在的全局服务注册信息集合globalRegisterInfoMap
         * 有一个全局的所有服务的持久化记录historyRecords
         */
        //TODO 是否可以使用ReentrantLock代替synchronized
        synchronized (globalRegisterInfoMap) {
            // 所有服务的持久化的信息记录
            ConcurrentMap<String, RegistryPersistRecord> concurrentMap = historyRecords;
            // 获取到这个地址以前注册过的注册信息
            RegisterMeta existRegiserMeta = maps.get(meta.getAddress());
            // 如果等于空，则说明该服务当前还未注册 这就需要在持久化记录中将该服务注册
            if (null == existRegiserMeta) {
                // 根据服务名查找持久化记录
                RegistryPersistRecord persistRecord = concurrentMap.get(serviceName);
                //如果持久化记录中没有记录该信息，就需要构造默认的持久化信息
                if (null == persistRecord || !isContainChildrenInfo(persistRecord, meta.getAddress())) {

                    persistRecord = new RegistryPersistRecord();
                    persistRecord.setServiceName(serviceName);                                                              //持久化的服务名
                    persistRecord.setBalanceStrategy(LoadBalanceStrategy.WEIGHTINGRANDOM);                                  //默认的负载均衡的策略

                    PersistProviderInfo providerInfo = new PersistProviderInfo();
                    providerInfo.setAddress(meta.getAddress());                                                             //服务提供者的地址
                    providerInfo.setIsReviewed(defaultRegistryServer.getRegistryServerConfig().getDefaultReviewState());    //服务默认是未审核
                    persistRecord.getProviderInfos().add(providerInfo);

                    concurrentMap.put(serviceName, persistRecord);
                }

                //循环该服务的所有服务提供者实例的信息，获取到当前实例的审核状态，设置好meta的审核信息
                for (PersistProviderInfo providerInfo : persistRecord.getProviderInfos()) {
                    if (providerInfo.getAddress().equals(meta.getAddress())) {
                        meta.setIsReviewed(providerInfo.getIsReviewed());
                    }
                }
                existRegiserMeta = meta;
                //全局注册服务信息 很重要
                maps.put(meta.getAddress(), existRegiserMeta);
            }

            this.getServiceMeta(meta.getAddress()).add(serviceName);

            //默认的负载均衡的策略
            LoadBalanceStrategy defaultBalanceStrategy = defaultRegistryServer.getRegistryServerConfig().getDefaultLoadBalanceStrategy();

            if (null != concurrentMap.get(serviceName)) {
                RegistryPersistRecord persistRecord = concurrentMap.get(serviceName);
                if (null != persistRecord.getBalanceStrategy()) {
                    defaultBalanceStrategy = persistRecord.getBalanceStrategy();
                }
            }

            // 设置该服务默认的负载均衡的策略
            globalServiceLoadBalance.put(serviceName, defaultBalanceStrategy);

            // 判断provider发送的信息已经被成功的存储的情况下，则告之服务注册成功
            ackCustomBody.setSuccess(true);

            // 如果审核通过，则通知相关服务的订阅者
            if (meta.getIsReviewed() == ServiceReviewState.PASS_REVIEW) {
                this.defaultRegistryServer.getRegistryConsumerManager().notifyMacthedSubscriber(meta, globalServiceLoadBalance.get(serviceName));
            }
        }
        //将地址与该channel绑定好，方便其他地方使用
        globalProviderChannelMetaMap.put(meta.getAddress(), channel);
        return responseTransporter;
    }


    public RemotingTransporter handleMetricsService(String metricsServiceName, long requestId) {
        return null;
    }

    /**
     * 修改某个服务的负载均衡的策略
     *
     * @param opaque
     * @param managerServiceCustomBody
     * @return
     */
    private RemotingTransporter handleModifyLoadBalance(long opaque, ManagerServiceCustomBody managerServiceCustomBody) {
        return null;
    }

    /**
     * 修改某个服务实例上的权重
     *
     * @param opaque
     * @param managerServiceCustomBody
     * @return
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    private RemotingTransporter handleModifyWeight(long opaque, ManagerServiceCustomBody managerServiceCustomBody) throws RemotingSendRequestException,
            RemotingTimeoutException, InterruptedException {
        return null;
    }

    /**
     * provider端发送的请求，取消对某个服务的提供
     *
     * @param request
     * @param channel
     * @return
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    public RemotingTransporter handlerRegisterCancel(RemotingTransporter request, Channel channel) throws RemotingSendRequestException,
            RemotingTimeoutException, InterruptedException {
        // 准备好ack信息返回个provider，悲观主义，默认返回失败ack，要求provider重新发送请求
        AckCustomBody ackCustomBody = new AckCustomBody(request.getOpaque(), false);
        RemotingTransporter responseTransporter = RemotingTransporter.createResponseTransporter(UyuniProtocol.ACK, ackCustomBody, request.getOpaque());

        // 接收到主体信息
        PublishServiceCustomBody publishServiceCustomBody = serializerImpl().readObject(request.bytes(), PublishServiceCustomBody.class);
        RegisterMeta meta = RegisterMeta.createRegiserMeta(publishServiceCustomBody, channel);
        handlePublishCancel(meta, channel);
        ackCustomBody.setSuccess(true);
        globalProviderChannelMetaMap.remove(meta.getAddress());
        return responseTransporter;
    }

    /**
     * 处理consumer的消息订阅，并返回结果
     *
     * @param request
     * @param channel
     * @return
     */
    public RemotingTransporter handleSubscribe(RemotingTransporter request, Channel channel) {
        SubcribeResultCustomBody subcribeResultCustomBody = new SubcribeResultCustomBody();
        // 创建请求响应
        RemotingTransporter responseTransporter = RemotingTransporter.createResponseTransporter(UyuniProtocol.SUBCRIBE_RESULT, subcribeResultCustomBody,
                request.getOpaque());
        // 接收到主体信息
        SubscribeRequestCustomBody requestCustomBody = serializerImpl().readObject(request.bytes(), SubscribeRequestCustomBody.class);
        String serviceName = requestCustomBody.getServiceName();
        // 将其加入到channel的group中去
        this.defaultRegistryServer.getRegistryConsumerManager().getSubscriberChannels().add(channel);
        // 存储消费者信息
        ConcurrentSet<Channel> channels = globalConsumerMetaMap.get(serviceName);
        if (null == channels) {
            channels = new ConcurrentSet<>();
        }
        channels.add(channel);
        globalConsumerMetaMap.put(serviceName, channels);

        //将订阅的channel上打上tag标记，表示该channel订阅的服务
        attachSubscribeEventOnChannel(serviceName, channel);

        ConcurrentMap<Address, RegisterMeta> maps = this.getRegisterMeta(serviceName);
        // 如果订阅的暂时还没有服务提供者，则返回空列表给订阅者
        if (maps.isEmpty()) {
            return responseTransporter;
        }

        //构建返回的订阅信息的对象
        buildSubcribeResultCustomBody(maps, subcribeResultCustomBody);
        return responseTransporter;
    }

    /***
     * 服务下线的接口
     *
     * @param meta
     * @param channel
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    public void handlePublishCancel(RegisterMeta meta, Channel channel) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {
        if (logger.isDebugEnabled()) {
            logger.debug("Cancel publish {} on channel{}.", meta, channel);
        }

        //将其channel上打上的标记移除掉
        attachPublishCancelEventOnChannel(meta, channel);
        final String serviceName = meta.getServiceName();
        //根据服务名全量查找该服务的地址以及注册信息
        ConcurrentMap<Address, RegisterMeta> maps = this.getRegisterMeta(serviceName);
        if (maps.isEmpty()) {
            return;
        }
        synchronized (globalRegisterInfoMap) {
            Address address = meta.getAddress();
            //移除该地址提供的服务
            RegisterMeta data = maps.remove(address);
            if (data != null) {
                // 移除这个地址提供的该服务
                this.getServiceMeta(address).remove(serviceName);
                if (data.getIsReviewed() == ServiceReviewState.PASS_REVIEW)
                    this.defaultRegistryServer.getRegistryConsumerManager().notifyMacthedSubscriberCancel(meta);
            }
        }
    }

    /**
     * 持久化操作
     * 原则：1)首先优先从globalRegisterInfoMap中持久化到库中
     * 2)如果globalRegisterInfoMap中没有信息，则从老版本中的historyRecords中的信息重新保存到硬盘中去,这样做的好处就是不需要多维护一个historyRecords这个全局变量的信息有效性
     * <p>
     * 这样做的原因是因为，只要有服务注册到注册中心，在注册的处理的时候，已经从历史中获取到以前审核和负载的情况，所以globalRegisterInfoMap中的信息是最新的
     * 如果有些服务以前注册过，但这次重启之后没有注册，所以就需要重新将其更新一下合并记录
     *
     * @throws IOException
     */
    public void persistServiceInfo() throws IOException {
        Map<String, RegistryPersistRecord> persistMap = new HashMap<>();
        //globalRegisterInfoMap 中保存
        for (String serviceName : globalRegisterInfoMap.keySet()) {
            RegistryPersistRecord persistRecord = new RegistryPersistRecord();
            persistRecord.setServiceName(serviceName);
            persistRecord.setBalanceStrategy(globalServiceLoadBalance.get(serviceName));
            List<PersistProviderInfo> providerInfos = new ArrayList<>();
            ConcurrentMap<Address, RegisterMeta> serviceMap = globalRegisterInfoMap.get(serviceName);
            for (Address address : serviceMap.keySet()) {
                PersistProviderInfo info = new PersistProviderInfo();
                info.setAddress(address);
                info.setIsReviewed(serviceMap.get(address).getIsReviewed());
                providerInfos.add(info);
            }
            persistRecord.setProviderInfos(providerInfos);
            persistMap.put(serviceName, persistRecord);
        }


        for (String serviceName : historyRecords.keySet()) {

            //不包含的时候
            if (!persistMap.keySet().contains(serviceName)) {
                persistMap.put(serviceName, historyRecords.get(serviceName));
            } else {
                //负载策略不需要合并更新，需要更新的是existRecord中没有的provider的信息
                RegistryPersistRecord existRecord = persistMap.get(serviceName);
                List<PersistProviderInfo> providerInfos = new ArrayList<>(existRecord.getProviderInfos());
                //可能需要合并的信息，合并原则，如果同地址的审核策略以globalRegisterInfoMap为准，如果不同地址，则合并信息
                RegistryPersistRecord possibleMergeRecord = historyRecords.get(serviceName);
                List<PersistProviderInfo> possibleProviderInfos = possibleMergeRecord.getProviderInfos();

                for (PersistProviderInfo eachPossibleInfo : possibleProviderInfos) {
                    Address address = eachPossibleInfo.getAddress();
                    boolean exist = false;
                    for (PersistProviderInfo existProviderInfo : providerInfos) {
                        if (existProviderInfo.getAddress().equals(address)) {
                            exist = true;
                            break;
                        }
                    }
                    if (!exist) {
                        providerInfos.add(eachPossibleInfo);
                    }
                }
                existRecord.setProviderInfos(providerInfos);
                persistMap.put(serviceName, existRecord);
            }
        }

        if (!persistMap.values().isEmpty()) {
            String jsonString = JSON.toJSONString(persistMap.values());
            if (jsonString != null) {
                PersistUtils.string2File(jsonString, this.defaultRegistryServer.getRegistryServerConfig().getStorePathRootDir());
            }
        }
    }


    /************************private method*****************************/

    /**
     * channel上打上该服务的标记 方便当channel inactive的时候，直接从channel上拿到标记的属性，通知
     *
     * @param meta
     * @param channel
     */
    private void attachPublishEventOnChannel(RegisterMeta meta, Channel channel) {
        Attribute<ConcurrentSet<RegisterMeta>> attr = channel.attr(S_PUBLISH_KEY);
        ConcurrentSet<RegisterMeta> registerMetaSet = attr.get();
        if (registerMetaSet == null) {
            ConcurrentSet<RegisterMeta> newRegisterMetaSet = new ConcurrentSet<>();
            registerMetaSet = attr.setIfAbsent(newRegisterMetaSet);
            /*if (registerMetaSet == null) {
                registerMetaSet = newRegisterMetaSet;
            }*/
        }
        registerMetaSet.add(meta);
    }

    private void attachPublishCancelEventOnChannel(RegisterMeta meta, Channel channel) {
        Attribute<ConcurrentSet<RegisterMeta>> attr = channel.attr(S_PUBLISH_KEY);
        ConcurrentSet<RegisterMeta> registerMetaSet = attr.get();
        if (registerMetaSet == null) {
            ConcurrentSet<RegisterMeta> newRegisterMetaSet = new ConcurrentSet<>();
            registerMetaSet = attr.setIfAbsent(newRegisterMetaSet);
            if (registerMetaSet == null) {
                registerMetaSet = newRegisterMetaSet;
            }
        }
        registerMetaSet.remove(meta);
    }

    /**
     * 根据服务名称找出提供此服务的全部地址和该服务在该地址下的审核情况
     *
     * @param serviceName
     * @return
     */
    private ConcurrentMap<Address, RegisterMeta> getRegisterMeta(String serviceName) {
        ConcurrentMap<Address, RegisterMeta> maps = globalRegisterInfoMap.get(serviceName);
        if (maps == null) {
            ConcurrentMap<Address, RegisterMeta> newMaps = new ConcurrentHashMap<>();
            maps = globalRegisterInfoMap.putIfAbsent(serviceName, newMaps);
            if (maps == null) {
                maps = newMaps;
            }
        }
        return maps;
    }

    private boolean isContainChildrenInfo(RegistryPersistRecord persistRecord, Address address) {
        List<PersistProviderInfo> infos = persistRecord.getProviderInfos();
        if (null != infos && !infos.isEmpty()) {
            for (PersistProviderInfo info : infos) {
                if (info.getAddress().equals(address)) {
                    return true;
                }
            }
        }
        return false;
    }

    /**
     * 根据地址查找该节点下提供的的所有服务
     *
     * @param address
     * @return
     */
    private ConcurrentSet<String> getServiceMeta(Address address) {
        ConcurrentSet<String> serviceMetaSet = globalServiceMetaMap.get(address);
        if (serviceMetaSet == null) {
            ConcurrentSet<String> newServiceMetaSet = new ConcurrentSet<>();
            serviceMetaSet = globalServiceMetaMap.putIfAbsent(address, newServiceMetaSet);
            if (serviceMetaSet == null) {
                serviceMetaSet = newServiceMetaSet;
            }
        }
        return serviceMetaSet;
    }


    public ConcurrentMap<String, RegistryPersistRecord> getHistoryRecords() {
        return historyRecords;
    }

    private void attachSubscribeEventOnChannel(String serviceName, Channel channel) {
        Attribute<ConcurrentSet<String>> attr = channel.attr(S_SUBSCRIBE_KEY);
        ConcurrentSet<String> serviceMetaSet = attr.get();
        if (serviceMetaSet == null) {
            ConcurrentSet<String> newServiceMetaSet = new ConcurrentSet<>();
            serviceMetaSet = attr.setIfAbsent(newServiceMetaSet);
            if (serviceMetaSet == null) {
                serviceMetaSet = newServiceMetaSet;
            }
        }
        serviceMetaSet.add(serviceName);
    }

    private void buildSubcribeResultCustomBody(ConcurrentMap<Address, RegisterMeta> maps, SubcribeResultCustomBody subcribeResultCustomBody) {

        Collection<RegisterMeta> values = maps.values();

        if (values.size() > 0) {
            List<RegisterMeta> registerMetas = new ArrayList<>();
            for (RegisterMeta meta : values) {
                // 判断是否人工审核过，审核过的情况下，组装给consumer的响应主体，返回个consumer
                if (meta.getIsReviewed() == ServiceReviewState.PASS_REVIEW) {
                    registerMetas.add(meta);
                }
            }
            subcribeResultCustomBody.setRegisterMeta(registerMetas);
        }
    }


}
